-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing hasNext behaviour #1745
Fixing hasNext behaviour #1745
Conversation
we previously returned an empty graphql response at the end of the response stream to set the `hasNext` field to false, to indicate that no more responses will come. That empty response is causing issues in some clients, so 24a00e6 was a fix to set the `hasNext` on a deferred response from inside query planner execution, but it does not account for parallel deferred response executions, so one response might come with `hasNext` to false then get another one. This commit attempts another solution, where we go through an intermediate task that checks if the response stream is closed (it is a channel, so it implements `FusedStream`). Unfortuantely, right now it fals to recognize when the stream is closed
|
||
let stream = once(ready(first)).chain(rest).boxed(); | ||
let (mut sender2, receiver2) = futures::channel::mpsc::channel(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we find a better name ? :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's "WiP"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll find a better name once I get it working :D
since the stream is marked as terminated from inside poll_next, we need to call it a second time after getting a message, to check if it is closed, but we cannot do that with an async method, since we need to send the current message ASAP. So we call `try_next`, and depending on its result, we either send the current message, or set hasNext on the current message and send it if the channel is closed, or send the current message, get the next one and try again to see if there's another one
so this solution with channels appears to be working when testing manually, but not in integration tests. I am testing another solution using an atomic counter, but it is currently very racy. Some issues I'm encountering here:
|
I will need #1640 to land first: to make the behaviour more coherent, I will move the response formatting step to the execution service, so the execution service will always return correct responses |
This is now working and can be reviewed (I'll clean up the remaining println calls tomorrow) |
receiver | ||
} | ||
|
||
async fn consume_responses( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there may be a race condition here.
In the case where stream.try_next()
is called and returns an error as there may be more items in the stream, if the stream is then closed before the call to next
in filter_stream
, there will be no final empty response with has_next: false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try_next
is like doing next
but without await: if there's a message in the stream, it will be returned by try_next
, if not then try_next
will return an error and the next call to next
would wait for it.
In the case you are describing, try_next
would not return an error if there are in flight messages. In the case where try_next
would return an error, then somehow between returning from consume_responses
and the call to next
the stream gets new messages then is closed, then next
would return a message, some calls to try_next
would return messages, then when there's nothing remaining try_next
would return Ok(None)`.
The one possible race I worry about is if messages are received and re-sent, then we await on next
, then for whatever reason the stream is disconnected (maybe all the senders are dropped). Then we would need to add a final has_next = false
response. But I don't see how this could play out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BrynCooke that last cvase should be addressed by 5210765
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
there could be a race condition where we consume and send all messages, then await for the next one, then the stream is closed and we don't have any message on which we would set `has_next`. So we detect that case and add a last message
we previously returned an empty graphql response at the end of the
response stream to set the
hasNext
field to false, to indicate that nomore responses will come.
That empty response is causing issues in some clients, so 24a00e6 was a
fix to set the
hasNext
on a deferred response from inside queryplanner execution, but it does not account for parallel deferred
response executions, so one response might come with
hasNext
to falsethen get another one.
This commit uses another approach, where we go through an
intermediate task that checks if the response stream is closed.
Fixes #1687